
// Copyright (c) 2020-present, INSPUR Co, Ltd. All rights reserved.
// This source code is licensed under Apache 2.0 License.

#include "userkey_index.h"
#include "tree_void_ref2.h"

namespace rocksdb {

UserKeyIndex::UserKeyIndex(UserKeyIndexType type, ITreeVoidRef::RefType refType, Logger *log)
  : IUserKeyIndex(type, log), refType_(refType){
  treeVoidRef_dlink_ = new ConcurrentDLink<ITreeVoidRef *>();
}
UserKeyIndex::~UserKeyIndex() { delete treeVoidRef_dlink_; }

ITreeVoidRef *UserKeyIndex::getTreeVoidRef(const Slice &userKey, const VersionNode *node) {
  ITreeVoidRef* curRef = nullptr;
  switch (refType_){
  case ITreeVoidRef::RefType::REFTYPE1 :
    curRef = new TreeVoidRef();
    break;
  case ITreeVoidRef::RefType::REFTYPE2 :
    curRef = new TreeVoidRef2();
    break;
  default:
    assert(false);
    break;
  }
//  auto curRef = new ITreeVoidRef(refType_);
  curRef->setContentList(const_cast<VersionNode*>(node));
  void *retRef = nullptr;
  // nextTID is not absolutely greater than key, so add compare logic.
  // if ARTRef not exists, we will put it onto ART tree.
  bool insertSuc = tree_->insertNoReplace(userKey, curRef, retRef);

  // key has ARTRef object on ART tree.
  if (!insertSuc) {
    curRef->setContentList(nullptr);
    delete curRef;
    return (ITreeVoidRef *)retRef;
  }

  ITreeVoidRef *nextRef = nullptr;
  if (retRef != 0) {
    nextRef = (ITreeVoidRef *)(retRef);
    if ((nextRef->Next() == nullptr || nextRef->Prev() == nullptr)) {
      if (parseKeyFromTreeVoidRef(nextRef).compare(userKey) <= 0) {
        // next < key  and  next has not inserted into double link list.
        // need redo get larger ARTRef object from ART tree.
        retRef = this->tree_->getGT(userKey);
        nextRef = (ITreeVoidRef *)(retRef);
      }
    }
  }else{
    retRef = this->tree_->getGT(userKey);
    nextRef = (ITreeVoidRef *)(retRef);
  }
  if (nextRef == nullptr)
    nextRef = treeVoidRef_dlink_->getTail();

  // insert current ARTRef object into double link list.
  treeVoidRef_dlink_->insert(
      curRef, nextRef, [&userKey](ITreeVoidRef *a) -> int {
        return parseKeyFromTreeVoidRef(a).compare(userKey);
      });

  return curRef;
}

ITreeVoidRef *UserKeyIndex::getGETreeVoidRef(const Slice &userKey) {
  void *eorg = tree_->getGE(userKey);
  ITreeVoidRef *retRef = nullptr;
  if (eorg != 0) {
    retRef = (ITreeVoidRef *)(eorg);
  }

  return retRef;
}

ITreeVoidRef *UserKeyIndex::getGLTreeVoidRef(const Slice &userKey) {
  void *eorg = tree_->getGT(userKey);
  ITreeVoidRef *retRef = nullptr;
  if (eorg != 0) {
    retRef = (ITreeVoidRef *)(eorg);
  }

  return retRef;
}

} // namespace rocksdb
